package com.ihr360.job.core.step;

import com.ihr360.job.core.BatchStatus;
import com.ihr360.job.core.ExitStatus;
import com.ihr360.job.core.JobInterruptedException;
import com.ihr360.job.core.Step;
import com.ihr360.job.core.StepExecutionListener;
import com.ihr360.job.core.UnexpectedJobExecutionException;
import com.ihr360.job.core.entity.StepExecution;
import com.ihr360.job.core.item.ExecutionContext;
import com.ihr360.job.core.job.builder.JobBuilderHelper;
import com.ihr360.job.core.lanuch.ExitCodeMapper;
import com.ihr360.job.core.lanuch.NoSuchJobException;
import com.ihr360.job.core.listener.CompositeStepExecutionListener;
import com.ihr360.job.core.repeat.RepeatException;
import com.ihr360.job.core.repository.JobRepository;
import com.ihr360.job.core.scope.context.StepSynchronizationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

import java.util.Date;

public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
    protected final Logger logger = LoggerFactory.getLogger(JobBuilderHelper.class.getName());
    private String name;

    private int startLimit = Integer.MAX_VALUE;

    private boolean allowStartIfComplete = false;

    private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener();

    private JobRepository jobRepository;

    /**
     * Default constructor.
     */
    public AbstractStep() {
        super();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.state(name != null, "A Step must have a name");
        Assert.state(jobRepository != null, "JobRepository is mandatory");
    }

    @Override
    public String getName() {
        return this.name;
    }

    /**
     * Set the name property. Always overrides the default value if this object is a Spring bean.
     *
     * @see #setBeanName(java.lang.String)
     */
    public void setName(String name) {
        this.name = name;
    }

    /**
     * Set the name property if it is not already set. Because of the order of the callbacks in a Spring container the
     * name property will be set first if it is present. Care is needed with bean definition inheritance - if a parent
     * bean has a name, then its children need an explicit name as well, otherwise they will not be unique.
     *
     * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
     */
    @Override
    public void setBeanName(String name) {
        if (this.name == null) {
            this.name = name;
        }
    }

    @Override
    public int getStartLimit() {
        return this.startLimit;
    }

    /**
     * Public setter for the startLimit.
     *
     * @param startLimit the startLimit to set
     */
    public void setStartLimit(int startLimit) {
        this.startLimit = startLimit == 0 ? Integer.MAX_VALUE : startLimit;
    }

    @Override
    public boolean isAllowStartIfComplete() {
        return this.allowStartIfComplete;
    }

    /**
     * Public setter for flag that determines whether the step should start again if it is already complete. Defaults to
     * false.
     *
     * @param allowStartIfComplete the value of the flag to set
     */
    public void setAllowStartIfComplete(boolean allowStartIfComplete) {
        this.allowStartIfComplete = allowStartIfComplete;
    }

    /**
     * Convenient constructor for setting only the name property.
     *
     * @param name
     */
    public AbstractStep(String name) {
        this.name = name;
    }

    /**
     * Extension point for subclasses to execute business logic. Subclasses should set the {@link ExitStatus} on the
     * {@link StepExecution} before returning.
     *
     * @param stepExecution the current step context
     * @throws Exception
     */
    protected abstract void doExecute(StepExecution stepExecution) throws Exception;

    /**
     * Extension point for subclasses to provide callbacks to their collaborators at the beginning of a step, to open or
     * acquire resources. Does nothing by default.
     *
     * @param ctx the {@link ExecutionContext} to use
     * @throws Exception
     */
    protected void open(ExecutionContext ctx) throws Exception {
    }

    /**
     * Extension point for subclasses to provide callbacks to their collaborators at the end of a step (right at the end
     * of the finally block), to close or release resources. Does nothing by default.
     *
     * @param ctx the {@link ExecutionContext} to use
     * @throws Exception
     */
    protected void close(ExecutionContext ctx) throws Exception {
    }

    /**
     * Template method for step execution logic - calls abstract methods for resource initialization (
     * {@link #open(ExecutionContext)}), execution logic ({@link #doExecute(StepExecution)}) and resource closing (
     * {@link #close(ExecutionContext)}).
     */
    @Override
    public final void execute(StepExecution stepExecution) throws JobInterruptedException,
            UnexpectedJobExecutionException {

        if (logger.isDebugEnabled()) {
            logger.debug("Executing: id=" + stepExecution.getId());
        }
        stepExecution.setStartTime(new Date());
        stepExecution.setStatus(BatchStatus.STARTED);
        getJobRepository().update(stepExecution);

        // Start with a default value that will be trumped by anything
        ExitStatus exitStatus = ExitStatus.EXECUTING;

        doExecutionRegistration(stepExecution);

        try {
            getCompositeListener().beforeStep(stepExecution);
            open(stepExecution.getExecutionContext());

            try {
                doExecute(stepExecution);
            }
            catch (RepeatException e) {
                throw e.getCause();
            }
            exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());

            // Check if someone is trying to stop us
            if (stepExecution.isTerminateOnly()) {
                throw new JobInterruptedException("JobExecution interrupted.");
            }

            // Need to upgrade here not set, in case the execution was stopped
            stepExecution.upgradeStatus(BatchStatus.COMPLETED);
            if (logger.isDebugEnabled()) {
                logger.debug("Step execution success: id=" + stepExecution.getId());
            }
        }
        catch (Throwable e) {
            stepExecution.upgradeStatus(determineBatchStatus(e));
            exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
            stepExecution.addFailureException(e);
            if (stepExecution.getStatus() == BatchStatus.STOPPED) {
                logger.info(String.format("Encountered interruption executing step %s in job %s : %s", name, stepExecution.getJobExecution().getJobInstance().getJobName(), e.getMessage()));
                if (logger.isDebugEnabled()) {
                    logger.debug("Full exception", e);
                }
            }
            else {
                logger.error(String.format("Encountered an error executing step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
            }
        }
        finally {

            try {
                // Update the step execution to the latest known value so the
                // listeners can act on it
                exitStatus = exitStatus.and(stepExecution.getExitStatus());
                stepExecution.setExitStatus(exitStatus);
                exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
            }
            catch (Exception e) {
                logger.error(String.format("Exception in afterStep callback in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
            }

            try {
                getJobRepository().updateExecutionContext(stepExecution);
            }
            catch (Exception e) {
                stepExecution.setStatus(BatchStatus.UNKNOWN);
                exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
                stepExecution.addFailureException(e);
                logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
                        + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
            }

            stepExecution.setEndTime(new Date());
            stepExecution.setExitStatus(exitStatus);

            try {
                getJobRepository().update(stepExecution);
            }
            catch (Exception e) {
                stepExecution.setStatus(BatchStatus.UNKNOWN);
                stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
                stepExecution.addFailureException(e);
                logger.error(String.format("Encountered an error saving batch meta data for step %s in job %s. "
                        + "This job is now in an unknown state and should not be restarted.", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
            }

            try {
                close(stepExecution.getExecutionContext());
            }
            catch (Exception e) {
                logger.error(String.format("Exception while closing step execution resources in step %s in job %s", name, stepExecution.getJobExecution().getJobInstance().getJobName()), e);
                stepExecution.addFailureException(e);
            }

            doExecutionRelease();

            if (logger.isDebugEnabled()) {
                logger.debug("Step execution complete: " + stepExecution.getSummary());
            }
        }
    }

    /**
     * Releases the most recent {@link StepExecution}
     */
    protected void doExecutionRelease() {
        StepSynchronizationManager.release();
    }

    /**
     * Registers the {@link StepExecution} for property resolution via {@link StepScope}
     *
     * @param stepExecution
     */
    protected void doExecutionRegistration(StepExecution stepExecution) {
        StepSynchronizationManager.register(stepExecution);
    }

    /**
     * Determine the step status based on the exception.
     */
    private static BatchStatus determineBatchStatus(Throwable e) {
        if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) {
            return BatchStatus.STOPPED;
        }
        else {
            return BatchStatus.FAILED;
        }
    }

    /**
     * Register a step listener for callbacks at the appropriate stages in a step execution.
     *
     * @param listener a {@link StepExecutionListener}
     */
    public void registerStepExecutionListener(StepExecutionListener listener) {
        this.stepExecutionListener.register(listener);
    }

    /**
     * Register each of the objects as listeners.
     *
     * @param listeners an array of listener objects of known types.
     */
    public void setStepExecutionListeners(StepExecutionListener[] listeners) {
        for (int i = 0; i < listeners.length; i++) {
            registerStepExecutionListener(listeners[i]);
        }
    }

    /**
     * @return composite listener that delegates to all registered listeners.
     */
    protected StepExecutionListener getCompositeListener() {
        return stepExecutionListener;
    }

    /**
     * Public setter for {@link JobRepository}.
     *
     * @param jobRepository is a mandatory dependence (no default).
     */
    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    protected JobRepository getJobRepository() {
        return jobRepository;
    }

    @Override
    public String toString() {
        return ClassUtils.getShortName(getClass()) + ": [name=" + name + "]";
    }

    /**
     * Default mapping from throwable to {@link ExitStatus}. Clients can modify the exit code using a
     * {@link StepExecutionListener}.
     *
     * @param ex the cause of the failure
     * @return an {@link ExitStatus}
     */
    private ExitStatus getDefaultExitStatusForFailure(Throwable ex) {
        ExitStatus exitStatus;
        if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) {
            exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName(),ex.getMessage());
        }
        else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) {
            exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName());
        }
        else {
            exitStatus = ExitStatus.FAILED.addExitDescription(ex);
        }

        return exitStatus;
    }
}